package com.example.sql;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

/**
 * Author wangJinLong
 * Date 2025/8/12 09:21
 **/
public class FlinkSqlCatalog {
    public static void main(String[] args) {
        extracted();

    }

    private static void extracted() {
        EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        tEnv.executeSql("CREATE CATALOG mysql_catalog WITH(\n" +
                "    'type' = 'jdbc',\n" +
                "    'default-database' = 'my_cdc',\n" +
                "    'username' = 'root',\n" +
                "    'password' = 'mysql@123456',\n" +
                "    'base-url' = 'jdbc:mysql://106.54.174.109:13306'\n" +
                ");");

        tEnv.useCatalog("mysql_catalog");

        String[] strings = tEnv.listDatabases();
        for (String string : strings) {
            System.out.println(string);
        }
    }
}
